package com.hao.chapter11;

import com.hao.chapter05.ClickSource;
import com.hao.chapter05.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


import static org.apache.flink.table.api.Expressions.$;

public class QueryAndOutput {
    public static void main(String[] args) throws Exception {
        //创建流的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //将数据源用流的方式进入
        DataStreamSource<Event> inputStream = env.addSource(new ClickSource());

        //创建表的环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 将流转换成表， 提取流中的 POJO对象
        Table table = tableEnv.fromDataStream(inputStream, $("user").as("user_name"), $("url"));

        // 创建虚拟表
        tableEnv.createTemporaryView("clickTable",table);

        // 使用SQL的方式解决需求
        Table table1 = tableEnv.sqlQuery("select `user_name` , url from clickTable");

        //使用TableAPI解决需求
        Table table2 = table.select($("user_name"), $("url"));


        //创建两个表，一个是文件流，另一个是控制台打印流
        //TODO 文件流
        tableEnv.executeSql("create table outTable(" +
                "user_name STRING," +
                "url STRING" +
                ")WITH(" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'output'," +
                " 'format' = 'csv' " +
                ")");

        //TODO 控制台打印流
        tableEnv.executeSql("create table printTable(" +
                "user_name STRING," +
                "url STRING" +
                ")WITH(" +
                " 'connector' = 'print'" +
                ")");

        //输出表
        table1.executeInsert("outTable");    //SQL方式用文件流
        table2.executeInsert("printTable");  //TableAPI方式用打印流

    }
}
